Wiring It All Together: The Application Entry Point
February 2, 2026Wiring It All Together: The Application Entry Point
We've built all the pieces of our distributed search cluster:
- TF-IDF algorithm for document scoring
- HTTP networking layer for communication
- Search workers and coordinators
- ZooKeeper-based leader election and service discovery
- Role transition handling
Now it's time to wire everything together into a complete, production-ready application.
The Challenge
Our main application needs to:
- Parse command-line arguments for configuration
- Connect to ZooKeeper with proper error handling
- Initialize service registries
- Set up leader election and role transitions
- Handle graceful shutdown on SIGINT/SIGTERM
Let's build it step by step.
Configuration
First, we define our configuration structure and defaults:
const ( defaultPort = 8080 defaultZkAddress = "localhost:2181" defaultSessionTimeout = 5 * time.Second defaultDocumentsDir = "resources/books")
type Config struct { Port int ZkAddress string SessionTimeout time.Duration DocumentsDir string}
Command-Line Parsing
Go's flag package makes argument parsing straightforward:
func parseFlags() *Config { config := &Config{}
flag.IntVar(&config.Port, "port", defaultPort, "HTTP server port") flag.StringVar(&config.ZkAddress, "zk", defaultZkAddress, "ZooKeeper address") flag.DurationVar(&config.SessionTimeout, "timeout", defaultSessionTimeout, "ZooKeeper session timeout") flag.StringVar(&config.DocumentsDir, "docs", defaultDocumentsDir, "Directory containing documents")
help := flag.Bool("help", false, "Show help message") flag.BoolVar(help, "h", false, "Show help message (shorthand)")
flag.Parse()
if *help { printUsage() os.Exit(0) }
return config}
This gives us a flexible CLI:
# Start with defaults./node
# Custom port and ZooKeeper address./node -port 8081 -zk zookeeper.example.com:2181
# Custom documents directory./node -docs /path/to/documents
ZooKeeper Connection
Connecting to ZooKeeper requires handling the asynchronous nature of the client:
func connectToZooKeeper(address string, timeout time.Duration) (*zk.Conn, error) { servers := []string{address}
conn, eventChan, err := zk.Connect(servers, timeout) if err != nil { return nil, fmt.Errorf("failed to connect: %w", err) }
// Handle ZooKeeper events in background go handleZkEvents(eventChan)
// Wait for connection to be established for i := 0; i < 10; i++ { state := conn.State() if state == zk.StateHasSession { return conn, nil } if state == zk.StateAuthFailed { conn.Close() return nil, fmt.Errorf("authentication failed") } time.Sleep(500 * time.Millisecond) }
return conn, nil}
The event handler logs connection state changes:
func handleZkEvents(eventChan <-chan zk.Event) { for event := range eventChan { switch event.State { case zk.StateConnected: log.Printf("ZooKeeper: Connected") case zk.StateHasSession: log.Printf("ZooKeeper: Session established") case zk.StateDisconnected: log.Printf("ZooKeeper: Disconnected (will auto-reconnect)") case zk.StateExpired: log.Printf("ZooKeeper: Session expired - node should restart") case zk.StateAuthFailed: log.Printf("ZooKeeper: Authentication failed") } }}
The Main Function
Here's where everything comes together:
func main() { config := parseFlags()
log.Printf("Starting distributed search cluster node") log.Printf("Configuration: port=%d, zk=%s, timeout=%v, docs=%s", config.Port, config.ZkAddress, config.SessionTimeout, config.DocumentsDir)
// Step 1: Connect to ZooKeeper zkConn, err := connectToZooKeeper(config.ZkAddress, config.SessionTimeout) if err != nil { log.Fatalf("Failed to connect to ZooKeeper: %v", err) } defer zkConn.Close()
// Step 2: Initialize service registries workersRegistry, err := cluster.NewZkServiceRegistry( zkConn, cluster.WorkersRegistryPath) if err != nil { log.Fatalf("Failed to create workers registry: %v", err) }
coordinatorsRegistry, err := cluster.NewZkServiceRegistry( zkConn, cluster.CoordinatorsRegistryPath) if err != nil { log.Fatalf("Failed to create coordinators registry: %v", err) }
// Step 3: Create role transition handler electionAction := app.NewOnElectionAction( workersRegistry, coordinatorsRegistry, config.Port, config.DocumentsDir, )
// Step 4: Set up leader election leaderElection, err := cluster.NewLeaderElection(zkConn, electionAction) if err != nil { log.Fatalf("Failed to create leader election: %v", err) }
if err := leaderElection.VolunteerForLeadership(); err != nil { log.Fatalf("Failed to volunteer for leadership: %v", err) }
// Step 5: Run initial election if err := leaderElection.ReelectLeader(); err != nil { log.Fatalf("Failed to run initial election: %v", err) }
log.Printf("Node started, current role: %s", electionAction.GetCurrentRole())
// Step 6: Wait for shutdown signal shutdown := make(chan os.Signal, 1) signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM)
sig := <-shutdown log.Printf("Received signal %v, initiating graceful shutdown...", sig)
// Step 7: Graceful shutdown if err := electionAction.Stop(); err != nil { log.Printf("Error during shutdown: %v", err) }
log.Printf("Node shutdown complete")}
Signal Handling
Proper signal handling is crucial for production systems:
shutdown := make(chan os.Signal, 1)signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM)
sig := <-shutdown
This catches:
SIGINT(Ctrl+C)SIGTERM(sent by process managers like systemd or Kubernetes)
When a signal is received, we:
- Stop the HTTP server gracefully
- Unregister from service registries
- Close the ZooKeeper connection
The Startup Sequence
Here's what happens when a node starts:
┌─────────────────────────────────────────────────────────────┐│ Node Startup │├─────────────────────────────────────────────────────────────┤│ 1. Parse command-line arguments ││ └─ port=8080, zk=localhost:2181, docs=resources/books ││ ││ 2. Connect to ZooKeeper ││ └─ Wait for session establishment ││ ││ 3. Initialize registries ││ ├─ /workers_service_registry ││ └─ /coordinators_service_registry ││ ││ 4. Create OnElectionAction ││ └─ Handles role transitions ││ ││ 5. Create LeaderElection ││ └─ Creates /election znode if needed ││ ││ 6. Volunteer for leadership ││ └─ Creates ephemeral sequential znode /election/c_XXXXX ││ ││ 7. Run election ││ ├─ If smallest → OnElectedToBeLeader() ││ └─ Otherwise → OnWorker() + watch predecessor ││ ││ 8. Block waiting for shutdown signal │└─────────────────────────────────────────────────────────────┘
Testing the Application
We test the configuration parsing in isolation:
func TestParseFlags_Defaults(t *testing.T) { flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError) os.Args = []string{"node"}
config := parseFlags()
if config.Port != defaultPort { t.Errorf("expected port %d, got %d", defaultPort, config.Port) } if config.ZkAddress != defaultZkAddress { t.Errorf("expected zk %s, got %s", defaultZkAddress, config.ZkAddress) }}
func TestParseFlags_CustomValues(t *testing.T) { flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError) os.Args = []string{ "node", "-port", "9090", "-zk", "zk.example.com:2181", "-timeout", "10s", "-docs", "/custom/docs", }
config := parseFlags()
if config.Port != 9090 { t.Errorf("expected port 9090, got %d", config.Port) } if config.SessionTimeout != 10*time.Second { t.Errorf("expected timeout 10s, got %v", config.SessionTimeout) }}
Running the Cluster
To run a 3-node cluster locally:
# Terminal 1: Start ZooKeeperdocker run -p 2181:2181 zookeeper
# Terminal 2: Start first node./node -port 8080
# Terminal 3: Start second node./node -port 8081
# Terminal 4: Start third node./node -port 8082
You'll see output like:
# Node 1 (becomes leader)Starting distributed search cluster nodeConfiguration: port=8080, zk=localhost:2181, timeout=5s, docs=resources/booksSuccessfully connected to ZooKeeper at localhost:2181Initialized service registriesVolunteered for leadership with znode: c_0000000001Elected as leader with znode: c_0000000001Transitioning to LEADER role on port 8080Successfully transitioned to LEADER role, serving at http://localhost:8080/searchNode started, current role: leader
# Node 2 (becomes worker)Starting distributed search cluster nodeConfiguration: port=8081, zk=localhost:2181, timeout=5s, docs=resources/booksSuccessfully connected to ZooKeeper at localhost:2181Initialized service registriesVolunteered for leadership with znode: c_0000000002Not leader. Watching predecessor: c_0000000001Transitioning to WORKER role on port 8081Successfully transitioned to WORKER roleNode started, current role: worker
Graceful Shutdown
When you press Ctrl+C:
^CReceived signal interrupt, initiating graceful shutdown...Stopping OnElectionActionStopping HTTP server on port 8080Unregistered from cluster: /coordinators_service_registry/n_0000000001Node shutdown complete
The other nodes will detect the leader's departure and trigger re-election automatically.
Production Considerations
For production deployments, consider:
- Health checks: Add a
/healthendpoint for load balancers - Metrics: Export Prometheus metrics for monitoring
- Logging: Use structured logging (e.g., zerolog, zap)
- Configuration: Support environment variables and config files
- Containerization: Create a Dockerfile for easy deployment
What's Next?
In the final post, we'll run the complete test suite and verify all 12 correctness properties pass.
Get the Code
git clone git@github.com:UnplugCharger/distributed_doc_search.gitgit checkout 07-application-entry-pointcd distributed-search-cluster-gogo build ./cmd/node./node -help
This post is part of the "Distributed Document Search" series. Follow along as we build a production-ready search cluster from scratch.